1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.codec.http.httptansaction;
12 import kiss.logger;
13 import collie.codec.http.codec.httpcodec;
14 import collie.codec.http.httpmessage;
15 import collie.codec.http.errocode;
16 import collie.codec.http.httpwritebuffer;
17 import kiss.net;
18 import kiss.event;
19 
20 import std.socket;
21 public 
22 import collie.codec.http.codec.wsframe;
23 
24 enum TransportDirection : ubyte {
25 	DOWNSTREAM,  // toward the client
26 	UPSTREAM     // toward the origin application or data
27 }
28 
29 interface HTTPTransactionHandler 
30 {
31 	/**
32    * Called once per transaction. This notifies the handler of which
33    * transaction it should talk to and will receive callbacks from.
34    */
35 	void setTransaction(HTTPTransaction txn);
36 	/**
37    * Called once after a transaction successfully completes. It
38    * will be called even if a read or write error happened earlier.
39    * This is a terminal callback, which means that the HTTPTransaction
40    * object that gives this call will be invalid after this function
41    * completes.
42    */
43 	void detachTransaction();
44 	
45 	/**
46    * Called at most once per transaction. This is usually the first
47    * ingress callback. It is possible to get a read error before this
48    * however. If you had previously called pauseIngress(), this callback
49    * will be delayed until you call resumeIngress().
50    */
51 	void onHeadersComplete(HTTPMessage msg);
52 	
53 	/**
54    * Can be called multiple times per transaction. If you had previously
55    * called pauseIngress(), this callback will be delayed until you call
56    * resumeIngress().
57    */
58 	void onBody(const ubyte[] chain);
59 	
60 	/**
61    * Can be called multiple times per transaction. If you had previously
62    * called pauseIngress(), this callback will be delayed until you call
63    * resumeIngress(). This signifies the beginning of a chunk of length
64    * 'length'. You will receive onBody() after this. Also, the length will
65    * be greater than zero.
66    */
67 	void onChunkHeader(size_t length) ;
68 	
69 	/**
70    * Can be called multiple times per transaction. If you had previously
71    * called pauseIngress(), this callback will be delayed until you call
72    * resumeIngress(). This signifies the end of a chunk.
73    */
74 	void onChunkComplete() ;
75 	
76 	/**
77    * Can be called any number of times per transaction. If you had
78    * previously called pauseIngress(), this callback will be delayed until
79    * you call resumeIngress(). Trailers can be received once right before
80    * the EOM of a chunked HTTP/1.1 reponse or multiple times per
81    * transaction from SPDY and HTTP/2.0 HEADERS frames.
82    */
83 //	void onTrailers(std::unique_ptr<HTTPHeaders> trailers) noexcept
84 //		= 0;
85 	
86 	/**
87    * Can be called once per transaction. If you had previously called
88    * pauseIngress(), this callback will be delayed until you call
89    * resumeIngress(). After this callback is received, there will be no
90    * more normal ingress callbacks received (onEgress*() and onError()
91    * may still be invoked). The Handler should consider
92    * ingress complete after receiving this message. This Transaction is
93    * still valid, and work may still occur on it until detachTransaction
94    * is called.
95    */
96 	void onEOM();
97 	
98 	/**
99    * Can be called at any time before detachTransaction(). This callback
100    * implies that an error has occurred. To determine if ingress or egress
101    * is affected, check the direciont on the HTTPException. If the
102    * direction is INGRESS, it MAY still be possible to send egress.
103    */
104 	void onError(HTTPErrorCode erromsg);
105 	
106 	/**
107    * If the remote side's receive buffer fills up, this callback will be
108    * invoked so you can attempt to stop sending to the remote side.
109    */
110 	void onEgressPaused();
111 	
112 	/**
113    * This callback lets you know that the remote side has resumed reading
114    * and you can now continue to send data.
115    */
116 	void onEgressResumed();
117 
118 	void onWsFrame(ref WSFrame wsf);
119 
120 
121 	bool onUpgtade(CodecProtocol protocol,HTTPMessage msg);
122 }
123 
124 class HTTPTransaction
125 {
126 	interface Transport
127 	{
128 		void pauseIngress(HTTPTransaction txn);
129 		
130 		void resumeIngress(HTTPTransaction txn);
131 		
132 		void transactionTimeout(HTTPTransaction txn);
133 		
134 		void sendHeaders(HTTPTransaction txn,
135 			HTTPMessage headers,
136 			bool eom);
137 		
138 		size_t sendBody(HTTPTransaction txn,
139 			in ubyte[],
140 			bool eom);
141 		
142 		size_t sendChunkHeader(HTTPTransaction txn,
143 			size_t length);
144 		
145 		size_t sendChunkTerminator(HTTPTransaction txn);
146 
147 		
148 		size_t sendEOM(HTTPTransaction txn);
149 
150 		void socketWrite(HTTPTransaction txn,StreamWriteBuffer buffer);
151 
152 //		size_t sendAbort(HTTPTransaction txn,
153 //			HTTPErrorCode statusCode);
154 
155 		size_t sendWsData(HTTPTransaction txn,OpCode code,ubyte[] data);
156 //		size_t sendPriority(HTTPTransaction txn,
157 //			const http2::PriorityUpdate& pri);
158 //		
159 //		size_t sendWindowUpdate(HTTPTransaction txn,
160 //			uint32_t bytes);
161 		
162 		void notifyPendingEgress();
163 		
164 		void detach(HTTPTransaction txn);
165 		
166 //		void notifyIngressBodyProcessed(uint32_t bytes);
167 //		
168 //		void notifyEgressBodyBuffered(int64_t bytes);
169 		
170 		Address getLocalAddress();
171 		
172 		Address getPeerAddress();
173 
174 		
175 		HTTPCodec getCodec();
176 		
177 		bool isDraining();
178 
179 	}
180 
181 	this(TransportDirection direction, HTTPCodec.StreamID id,uint seqNo)
182 	{
183 		_id = id;
184 		_seqNo = seqNo;
185 	}
186 	@property HTTPTransactionHandler handler(){return _handler;}
187 	@property void handler(HTTPTransactionHandler han){_handler = han;}
188 
189 	@property streamID(){return _id;}
190 	@property transport(Transport port){_transport = port;}
191 	@property Transport transport(){return _transport;}
192 
193 	bool isUpstream() const {
194 		return _direction == TransportDirection.UPSTREAM;
195 	}
196 	
197 	bool isDownstream() const {
198 		return _direction == TransportDirection.DOWNSTREAM;
199 	}
200 	uint getSequenceNumber() const { return _seqNo; }
201 
202 	HTTPCodec.StreamID getID() const { return _id; }
203 
204 
205 	Address getLocalAddress(){return _transport.getLocalAddress();}
206 	
207 	Address getPeerAddress(){return _transport.getPeerAddress();}
208 
209 	/**
210    * Invoked by the session when the ingress headers are complete
211    */
212 	void onIngressHeadersComplete(HTTPMessage msg)
213 	{
214 		// logDebug("onIngressHeadersComplete handle is ", (handler is null));
215 		if(isUpstream() && msg.isResponse()) {
216 			_lastResponseStatus = msg.statusCode;
217 		}
218 		if(_handler)
219 			_handler.onHeadersComplete(msg);
220 	}
221 	
222 	/**
223    * Invoked by the session when some or all of the ingress entity-body has
224    * been parsed.
225    */
226 	void onIngressBody(const ubyte[] chain, ushort padding)
227 	{
228 		if(_handler)
229 			_handler.onBody(chain);
230 	}
231 	
232 	/**
233    * Invoked by the session when a chunk header has been parsed.
234    */
235 	void onIngressChunkHeader(size_t length)
236 	{
237 		if(_handler)
238 			_handler.onChunkHeader(length);
239 	}
240 	
241 	/**
242    * Invoked by the session when the CRLF terminating a chunk has been parsed.
243    */
244 	void onIngressChunkComplete()
245 	{
246 		if(_handler)
247 			_handler.onChunkComplete();
248 	}
249 
250 	/**
251    * Invoked by the session when the ingress message is complete.
252    */
253 	void onIngressEOM()
254 	{
255 		if(_handler)
256 			_handler.onEOM();
257 	}
258 
259 	void onErro(HTTPErrorCode erro)
260 	{
261 		if(_handler)
262 			_handler.onError(erro);
263 	}
264 	/**
265    * Schedule or refresh the timeout for this transaction
266    */
267 	void refreshTimeout() {}
268 
269 	/**
270    * Timeout callback for this transaction.  The timer is active while
271    * until the ingress message is complete or terminated by error.
272    */
273 	void timeoutExpired() {}
274 
275 	/**
276    * Send the egress message headers to the Transport. This method does
277    * not actually write the message out on the wire immediately. All
278    * writes happen at the end of the event loop at the earliest.
279    * Note: This method should be called once per message unless the first
280    * headers sent indicate a 1xx status.
281    *
282    * sendHeaders will not set EOM flag in header frame, whereas
283    * sendHeadersWithEOM will. sendHeadersWithOptionalEOM backs both of them.
284    *
285    * @param headers  Message headers
286    */
287 	void sendHeaders(HTTPMessage headers)
288 	{
289 		sendHeadersWithOptionalEOM(headers,false);
290 	}
291 
292 	void sendHeadersWithEOM(HTTPMessage headers)
293 	{
294 		sendHeadersWithOptionalEOM(headers,true);
295 	}
296 
297 	void sendHeadersWithOptionalEOM(HTTPMessage headers, bool eom)
298 	{
299 		if(transport)
300 			transport.sendHeaders(this,headers,eom);
301 	}
302 	/**
303    * Send part or all of the egress message body to the Transport. If flow
304    * control is enabled, the chunk boundaries may not be respected.
305    * This method does not actually write the message out on the wire
306    * immediately. All writes happen at the end of the event loop at the
307    * earliest.
308    * Note: This method may be called zero or more times per message.
309    *
310    * @param body Message body data; the Transport will take care of
311    *             applying any necessary protocol framing, such as
312    *             chunk headers.
313    */
314 	void sendBody(in ubyte[] body_, bool iseom = false){
315 		if(transport)
316 			transport.sendBody(this,body_, iseom);
317 	}
318 	
319 	/**
320    * Write any protocol framing required for the subsequent call(s)
321    * to sendBody(). This method does not actually write the message out on
322    * the wire immediately. All writes happen at the end of the event loop
323    * at the earliest.
324    * @param length  Length in bytes of the body data to follow.
325    */
326 	void sendChunkHeader(size_t length) {
327 		if(transport)
328 			transport.sendChunkHeader(this,length);
329 	}
330 
331 	void socketWrite(StreamWriteBuffer buffer){
332 		if(transport)
333 			transport.socketWrite(this,buffer);
334 	}
335 	
336 	/**
337    * Write any protocol syntax needed to terminate the data. This method
338    * does not actually write the message out on the wire immediately. All
339    * writes happen at the end of the event loop at the earliest.
340    * Frame begun by the last call to sendChunkHeader().
341    */
342 	void sendChunkTerminator() {
343 		if(transport)
344 			transport.sendChunkTerminator(this);
345 	}
346 	/**
347    * Send part or all of the egress message body to the Transport. If flow
348    * control is enabled, the chunk boundaries may not be respected.
349    * This method does not actually write the message out on the wire
350    * immediately. All writes happen at the end of the event loop at the
351    * earliest.
352    * Note: This method may be called zero or more times per message.
353    *
354    * @param body Message body data; the Transport will take care of
355    *             applying any necessary protocol framing, such as
356    *             chunk headers.
357    */
358 	/**
359    * Finalize the egress message; depending on the protocol used
360    * by the Transport, this may involve sending an explicit "end
361    * of message" indicator. This method does not actually write the
362    * message out on the wire immediately. All writes happen at the end
363    * of the event loop at the earliest.
364    *
365    * If the ingress message also is complete, the transaction may
366    * detach itself from the Handler and Transport and delete itself
367    * as part of this method.
368    *
369    * Note: Either this method or sendAbort() should be called once
370    *       per message.
371    */
372 	void sendEOM(){
373 		if(transport)
374 			transport.sendEOM(this);
375 	}
376 
377 	void sendTimeOut()
378 	{
379 		if(!transport) return;
380 		import collie.codec.http.headers;
381 		scope HTTPMessage msg = new HTTPMessage();
382 		msg.statusCode(408);
383 		msg.statusMessage("Request Timeout");
384 		msg.getHeaders.add(HTTPHeaderCode.CONNECTION,"close");
385 		sendHeadersWithEOM(msg);
386 	}
387 
388 	void sendWsData(OpCode code,ubyte[] data)
389 	{
390 		if(transport)
391 			transport.sendWsData(this,code,data);
392 	}
393 
394 	void onWsFrame(ref WSFrame wsf){
395 		logDebug(".....");
396 		if(_handler)
397 			_handler.onWsFrame(wsf);
398 	}
399 
400 	bool onUpgtade(CodecProtocol protocol, HTTPMessage msg){
401 		if(_handler)
402 			return _handler.onUpgtade(protocol, msg);
403 
404 		return false;
405 	}
406 
407 package:
408 	void onDelayedDestroy()
409 	{
410 		// logDebug("deleting is ", deleting);
411 		if(deleting) return;
412 		deleting = true;
413 		if(_handler) {
414 			_handler.detachTransaction();
415 			_handler = null;
416 		}
417 		if(_transport) {
418 			_transport.detach(this);
419 			_transport = null;
420 		}
421 	}
422 private:
423 	HTTPCodec.StreamID _id;
424 	Transport _transport;
425 	HTTPTransactionHandler _handler;
426 	TransportDirection _direction;
427 	uint _seqNo;
428 
429 	bool deleting = false;
430 private:
431 	ushort _lastResponseStatus;
432 }
433